[[transactions]]
= Transactions

This section describes how Spring for Apache Kafka supports transactions.

[[overview]]
== Overview

The 0.11.0.0 client library added support for transactions.
Spring for Apache Kafka adds support in the following ways:

* `KafkaTransactionManager`: Used with normal Spring transaction support (`@Transactional`, `TransactionTemplate`, etc)
* Transactional `KafkaMessageListenerContainer`
* Local transactions with `KafkaTemplate`
* Transaction synchronization with other transaction managers

Transactions are enabled by providing the `DefaultKafkaProducerFactory` with a `transactionIdPrefix`.
In that case, instead of managing a single shared `Producer`, the factory maintains a cache of transactional producers.
When the user calls `close()` on a producer, it is returned to the cache for reuse instead of actually being closed.
The `transactional.id` property of each producer is `transactionIdPrefix` + `n`, where `n` starts with `0` and is incremented for each new producer.
In previous versions of Spring for Apache Kafka, the `transactional.id` was generated differently for transactions started by a listener container with a record-based listener, to support fencing zombies, which is not necessary any more, with `EOSMode.V2` being the only option starting with 3.0.
For applications running with multiple instances, the `transactionIdPrefix` must be unique per instance.

Also see xref:kafka/exactly-once.adoc[Exactly Once Semantics].

Also see xref:kafka/transactions.adoc#transaction-id-prefix[`transactionIdPrefix`].

With Spring Boot, it is only necessary to set the `spring.kafka.producer.transaction-id-prefix` property - Spring Boot will automatically configure a `KafkaTransactionManager` bean and wire it into the listener container.

IMPORTANT: Starting with version 2.5.8, you can now configure the `maxAge` property on the producer factory.
This is useful when using transactional producers that might lay idle for the broker's `transactional.id.expiration.ms`.
With current `kafka-clients`, this can cause a `ProducerFencedException` without a rebalance.
By setting the `maxAge` to less than `transactional.id.expiration.ms`, the factory will refresh the producer if it is past its max age.

[[using-kafkatransactionmanager]]
== Using `KafkaTransactionManager`

The `KafkaTransactionManager` is an implementation of Spring Framework's `PlatformTransactionManager`.
It is provided with a reference to the producer factory in its constructor.
If you provide a custom producer factory, it must support transactions.
See `ProducerFactory.transactionCapable()`.

You can use the `KafkaTransactionManager` with normal Spring transaction support (`@Transactional`, `TransactionTemplate`, and others).
If a transaction is active, any `KafkaTemplate` operations performed within the scope of the transaction use the transaction's `Producer`.
The manager commits or rolls back the transaction, depending on success or failure.
You must configure the `KafkaTemplate` to use the same `ProducerFactory` as the transaction manager.

[[transaction-synchronization]]
== Transaction Synchronization

This section refers to producer-only transactions (transactions not started by a listener container); see xref:kafka/transactions.adoc#container-transaction-manager[Using Consumer-Initiated Transactions] for information about chaining transactions when the container starts the transaction.

If you want to send records to kafka and perform some database updates, you can use normal Spring transaction management with, say, a `DataSourceTransactionManager`.

[source, java]
----
@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}
----

The interceptor for the `@Transactional` annotation starts the transaction and the `KafkaTemplate` will synchronize a transaction with that transaction manager; each send will participate in that transaction.
When the method exits, the database transaction will commit followed by the Kafka transaction.
If you wish the commits to be performed in the reverse order (Kafka first), use nested `@Transactional` methods, with the outer method configured to use the `DataSourceTransactionManager`, and the inner method configured to use the `KafkaTransactionManager`.

See xref:tips.adoc#ex-jdbc-sync[Examples of Kafka Transactions with Other Transaction Managers] for examples of an application that synchronizes JDBC and Kafka transactions in Kafka-first or DB-first configurations.

NOTE: Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller.
Previously, this was silently ignored (logged at debug level).
Applications should take remedial action, if necessary, to compensate for the committed primary transaction.

[[container-transaction-manager]]
== Using Consumer-Initiated Transactions

The `ChainedKafkaTransactionManager` is now deprecated, since version 2.7; see the JavaDocs for its super class `ChainedTransactionManager` for more information.
Instead, use a `KafkaTransactionManager` in the container to start the Kafka transaction and annotate the listener method with `@Transactional` to start the other transaction.

See xref:tips.adoc#ex-jdbc-sync[Examples of Kafka Transactions with Other Transaction Managers] for an example application that chains JDBC and Kafka transactions.

IMPORTANT: xref:retrytopic.adoc[Non-Blocking Retries] cannot combine with xref:kafka/transactions.adoc#container-transaction-manager[Container Transactions].
When the listener code throws an exception, container transaction commit succeeds, and the record is sent to the retryable topic.

[[kafkatemplate-local-transactions]]
== `KafkaTemplate` Local Transactions

You can use the `KafkaTemplate` to execute a series of operations within a local transaction.
The following example shows how to do so:

[source, java]
----
boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});
----

The argument in the callback is the template itself (`this`).
If the callback exits normally, the transaction is committed.
If an exception is thrown, the transaction is rolled back.

NOTE: If there is a `KafkaTransactionManager` (or synchronized) transaction in process, it is not used.
Instead, a new "nested" transaction is used.

[[transaction-id-prefix]]
== `TransactionIdPrefix`

With `EOSMode.V2` (aka `BETA`), the only supported mode, it is no longer necessary to use the same `transactional.id`, even for consumer-initiated transactions; in fact, it must be unique on each instance the same as for producer-initiated transactions.
This property must have a different value on each application instance.

[[transaction-id-suffix-fixed]]
== `TransactionIdSuffix Fixed`

Since 3.2, a new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`.
User can then use a `RetryTemplate` configured to retry that exception, with a suitably configured back off.

[source,java]
----
public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}
----
When setting `maxCache` to 5, `transactional.id` is `my.txid.`++`{0-4}`+.

IMPORTANT: When using `KafkaTransactionManager` with the `ConcurrentMessageListenerContainer` and enabling `maxCache`, it is necessary to set `maxCache` to a value greater than or equal to `concurrency`.
If a `MessageListenerContainer` is unable to acquire a `transactional.id` suffix, it will throw a `NoProducerAvailableException`.
When using nested transactions in the `ConcurrentMessageListenerContainer`, it is necessary to adjust the maxCache setting to handle the increased number of nested transactions.

[[tx-template-mixed]]
== `KafkaTemplate` Transactional and non-Transactional Publishing

Normally, when a `KafkaTemplate` is transactional (configured with a transaction-capable producer factory), transactions are required.
The transaction can be started by a `TransactionTemplate`, a `@Transactional` method, calling `executeInTransaction`, or by a listener container, when configured with a `KafkaTransactionManager`.
Any attempt to use the template outside the scope of a transaction results in the template throwing an `IllegalStateException`.
Starting with version 2.4.3, you can set the template's `allowNonTransactional` property to `true`.
In that case, the template will allow the operation to run without a transaction, by calling the `ProducerFactory`+++'+++s `createNonTransactionalProducer()` method; the producer will be cached, or thread-bound, as normal for reuse.
See xref:kafka/sending-messages.adoc#producer-factory[Using `DefaultKafkaProducerFactory`].

[[transactions-batch]]
== Transactions with Batch Listeners

When a listener fails while transactions are being used, the `AfterRollbackProcessor` is invoked to take some action after the rollback occurs.
When using the default `AfterRollbackProcessor` with a record listener, seeks are performed so that the failed record will be redelivered.
With a batch listener, however, the whole batch will be redelivered because the framework doesn't know which record in the batch failed.
See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information.

When using a batch listener, version 2.4.2 introduced an alternative mechanism to deal with failures while processing a batch: `BatchToRecordAdapter`.
When a container factory with `batchListener` set to true is configured with a `BatchToRecordAdapter`, the listener is invoked with one record at a time.
This enables error handling within the batch, while still making it possible to stop processing the entire batch, depending on the exception type.
A default `BatchToRecordAdapter` is provided, that can be configured with a standard `ConsumerRecordRecoverer` such as the `DeadLetterPublishingRecoverer`.
The following test case configuration snippet illustrates how to use this feature:

[source, java]
----
public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}
----

